---
title: 11 Logging
---

从这篇教程开始，我们为代理增加更多的功能。本篇是 logging，对请求响应的相关信息进行记录。包括请求、响应的原始内容，以及在二者基础上的一些统计信息。

## 问题

既然是 logging，能记录的信息当然是越多越好，方便调试和统计。比如：

* 请求信息
  * 请求头
  * 请求消息体
* 响应信息
  * 响应头
  * 响应消息体
* 请求发送时间
* 响应收到时间
* 响应接收完成时间
* 下游 ip 地址和端口
* 本地 ip 地址和端口
* 扩展信息

基本上，在前面的教程里我们也都有提及以上信息的获取方式。为了方便机器可读，这些数据封装成 JSON 格式并上报。

## 日志插件

功能的实现继续使用[插件](/tutorial/05-plugins)的方式进行扩展。要采集的信息分别要在请求发起前和响应收到后，在[插件](/tutorial/05-plugins)那篇教程中，通过 *use* 过滤器引入插件时，我们预留了 *response* 子管道：

```js
.pipeline('request')
  .use(
    [
      'plugins/router.js',
      'plugins/default.js',
    ],
    'request',
    'response',
    () => __turnDown
  )
```

各插件的执行顺序为：

* request: A > B > C
* response: C > B > A

写插件前，先引入插件（logger.js）吧：

```js
{
  "listen": 8000,
  "plugins": [
    "plugins/router.js",
    "plugins/logger.js",
    "plugins/balancer.js",
    "plugins/default.js"
  ]
}
```

同样也需要对应的配置文件，用来配置日志上报的“位置”。为了方便测试，后面测试时我们会 mock 一个服务。

```js
{
  "logURL": "http://127.0.0.1:8123/log"
}
```

> 这里直接将采集后的日志，通过网络批量**直接**发送到存储中，比如 ElasticSearch、ClickHouse。在网络状况够好的情况下，直接入库相比写入磁盘再采集上报，可以避免磁盘 IO 以及无需部署类似 FileBeat、Fluentd 等日志采集工具，减少架构的复杂度。通过网络直接上报，还可以缓冲实现批量压缩上报，降低存储的压力。

接下来就是插件的本体了。

### 全局变量

我们肯定不想将采集到的请求和响应信息分开上报（这对数据的存储和查询带来难度），而是需要先将数据聚合并做部分的统计操作。这就需要在发送请求前将请求的信息记录下来，收到响应之后将二者的信息封装起来。这里就需要借助全局变量了：

```js
pipy({
  _logURL: new URL(config.logURL),
  _request: null,
  _requestTime: 0,
  _responseTime: 0,
})

.export('logger', {
  __logInfo: {},
})
```

这里我们还暴露了模块变量，方便在其他模块中对日志的内容进行扩展。有了全局变量之后，就是在请求和响应时，对数据进行记录了。

### logger.js

先写“骨架”：

```js
//...
.pipeline('request')
.pipeline('response')
```

这是作为插件应用的两个子管道。日志的记录并不是代理的主流程而是作为“旁路”存在，记录的逻辑不能影响主流程的数据，操作的应该是数据的副本。

## 过滤器

### fork 过滤器

*fork* 过滤器是[连接过滤器](/tutorial/04-routing/#连接过滤器)的一种。它可以在子管道中处理当前的上下文和消息，但并不会使用子管道的输出作为其输出，而是将其输入作为输出，这个有点类似管道的分流器。可以理解成只会影响上下文，不会影响当前管道的消息。

*fork* 的使用很简单，只需要指定子管道的名字即可：

```js
//...
.pipeline('request')
  .fork('log-request')

.pipeline('response')
  .fork('log-response')

.pipeline('log-request')

.pipeline('log-response')
```

> 这里需要注意，*fork* 过滤器指定的子管道处理完成之后，才能继续原管道的执行。因此在子管道中，尽量避免做耗时较长的操作，减少对请求性能的影响。

### handleMessage 过滤器

首先是请求的记录

```js
.pipeline('log-request')
  .handleMessageStart(
    () => _requestTime = Date.now()
  )
  .decompressHTTP()
  .handleMessage(
    '256k',
    msg => _request = msg
  )
```

`Date.now()` 用于获取当前时间，在接收到 `MessageStart` 事件时我们记录收到请求的时间，使用 `handleMessageStart` 过滤器处理该事件。

在拿到完整的消息后，通过 `handleMessage` 过滤器记录完整的请求：

* 第一个参数是消息体的最大值（可省略，默认是 `-1`，不做限制）。这里为避免过多的使用内存，设置为 `256k`。
* 第二个参数为回调函数，回调函数的参数为 `Message` 对象。

### merge 过滤器

接下来是响应的记录：

```js
.pipeline('log-response')
  .handleMessageStart(
    () => _responseTime = Date.now()
  )
  .decompressHTTP()
  .replaceMessage(
    '256k',
    msg => (
      new Message(
        JSON.encode({
          req: {
            ..._request.head,
            body: _request.body.toString(),
          },
          res: {
            ...msg.head,
            body: msg.body.toString(),
          },
          reqTime: _requestTime,
          resTime: _responseTime,
          endTime: Date.now(),
          remoteAddr: __inbound.remoteAddress,
          remotePort: __inbound.remotePort,
          localAddr: __inbound.localAddress,
          localPort: __inbound.localPort,
          ...__logInfo,
        }).push('\n')
      )
    )
  )
  .merge('log-send', () => '')

.pipeline('log-send')
```

> 这里数据的封装，使用了展开语法。

在 *log-response* 子管道中，对响应的信息就行记录。此时可以拿到我们构建日志所需要的所有数据，因此在该子管道中将数据封装成 JSON 格式对象，并创建新的 `Message`。

有了新的日志 `Message` 之后，我们就要将其发送出去？每次请求都发送一次，除了对日志存储带来压力外，还会影响请求的性能。假如能够将多个请求的日志缓存起来批量上报？

连接到上报子管道 *log-send* 时，用了 *merge* 过滤器。

*merge* 过滤器也是[连接过滤器](/tutorial/04-routing/#连接过滤器)的一种，在消息的处理方式上有 [muxHTTP 过滤器](/tutorial/03-proxy/#muxhttp-过滤器) 和 *fork* 过滤器的影子。

它与 *muxHTTP* 过滤器一样，会将多个流发送到一个**共享的**管道中处理；但不同的是 *merge* 过滤器不会使用子管道的输出作为其输出，这点与 *fork* 过滤器一样。

*merge* 过滤器的使用与 *muxHTTP* 一样，除了要指定子管道的名字外，也要指定一个 key。关于 key 的描述，可以看 [路由教程中的连接共享问题](/tutorial/04-routing/#连接共享的问题)

### pack 过滤器

*log-send* 子管道会接收父管道发来的消息写入缓冲区，缓冲区有设置大小；假如设置了超时时间，还会启动一个定时器，也就是缓冲区的刷新间隔。

二者有任一达到设定值，缓冲区就会被刷新，数据被发送到指定的目标（这里我们用的 mock 服务）。

```js
.pipeline('log-send')
  .pack(
    1000,
    {
      timeout: 5,
    }
  )
  .replaceMessageStart(
    () => new MessageStart({
      method: 'POST',
      path: _logURL.path,
      headers: {
        'Host': _logURL.host,
        'Content-Type': 'application/json',
      }
    })
  )
  .encodeHTTPRequest()
  .connect(
    () => _logURL.host,
    {
      bufferLimit: '8m',
    }
  )
```

## 测试

正式测试之前需要 mock 一下日志收集服务。

```js
// mock.js
pipy()

.listen(8123)
  .serveHTTP(
    msg => (
      console.log(`body: ${msg.body}`)
    )
  )
```

执行 `pipy mock.js` 运行 mock 服务，然后发送测试请求：

```python
curl http://localhost:8000/ip
You are requesting / from ::ffff:127.0.0.1
```

在 mock 服务的命令行窗口将看到：

```log
body: {"req":{"protocol":"HTTP/1.1","headers":{"host":"localhost:8000","user-agent":"curl/7.77.0","accept":"*/*"},"method":"GET","path":"/","body":""},"res":{"protocol":"HTTP/1.1","headers":{"connection":"keep-alive"},"status":200,"statusText":"OK","body":"You are requesting / from ::ffff:127.0.0.1\n"},"reqTime":1637938843619,"resTime":1637938843620,"endTime":1637938843620,"remoteAddr":"::1","remotePort":53624,"localAddr":"::1","localPort":8000}
```

## 总结

本节的内容比较多，引入了多个新的过滤器。

### 收获

* *use* 过滤器引入插件时，turn down 子管道的执行顺序与插件的引入顺序正好相反。
* *fork* 过滤器的功能，可以将事件的拷贝发送到子管道中进行处理，并且使用过滤器的输入作为输出，不使用子管道的输出。
* *merge* 过滤器与 *muxHTTP* 过滤器一样，会将多个流发送到一个**共享的**管道中处理；但不同的是 *merge* 过滤器不会使用子管道的输出作为其输出，这点与 *fork* 过滤器类似。
* *pack* 过滤器可以对消息进行缓冲，并支持缓冲区大小和定时两种刷新方式。


### 接下来

安全是所有系统都迈不过的那道坎，下一节我们将尝试为代理增加 JWT 的安全校验。